import java.text.SimpleDateFormat

import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
    /**
     * @description: ${description}
     * @author: Liu Jun Jun
     * @create: 2020-06-29 13:59
     **/

object TumblingWindow {
        def main(args: Array[String]): Unit = {
                val env = StreamExecutionEnvironment.getExecutionEnvironment

                val dataDS = env.socketTextStream("Desktop", 3456)

                val tsDS = dataDS.map(str => {
                val strings = str.split(",")
                (strings(0), strings(1).toLong, 1)
    }).keyBy(0)
                //窗口大小为5s的滚动窗口
                //.timeWindow(Time.seconds(5))和下面的这种写法都是可以的
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .apply {
            (tuple: Tuple, window: TimeWindow, es: Iterable[(String, Long, Int)], out: Collector[String]) => {
                val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
                //out.collect(s"window:[${sdf.format(new Date(window.getStart))}-${sdf.format(new Date(window.getEnd))}]:{ ${es.mkString(",")} }")
                out.collect(s"window:[${window.getStart}-${window.getEnd}]:{ ${es.mkString(",")} }")
            }
        }.print("windows:>>>")

        env.execute()
  }
}

